Visualisation des données

In [1]:
import visdom
import numpy as np
import sys
sys.path.append("../TimeSeriesTools")
import utils
from cerberus import Validator
import chart_studio.plotly as py
import plotly.express as px
import plotly.tools as tls
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import pandas as pd
from datetime import datetime

Connexion à la base MongoDB

In [2]:
import mongodb_utils
db_host= 'localhost'
port = '28018'
db_name='TimeSeriesBench'
mongodb_client = mongodb_utils.mongodb_connect(db_host, port)
Trying to connect to MongoDB server: localhost on port: 28018
In [3]:
import kairosdb_utils
global kairosdb_server 
kairosdb_server = "http://localhost:6060"
In [4]:
import influxdb_utils
db_host= 'localhost'
port = '9086'
db_name='TimeSeriesBench'
influxdb_client = influxdb_utils.influxdb_connect(db_host, port)
Trying to connect to InfluxDB server without proxy: localhost on port: 9086
connection sucess!
In [5]:
import warp10_utils
global warp10_server 
warp10_server = "http://localhost:7070"

Lecture des données dans la base

Cette fonction est utilisée pour lire tous les documents dans la base de mongodb, mais il reste un problème : mongodb n'est pas une base de données spécialement conçue pour les données de séries temporaires, donc c'est possible d'exister certains données dédoublées avec même timestamps. Soit on définit un règle avant le stockage de mongodb pour supprimer ou modifier les données dédboulées, ou soit on le néglige.

In [6]:
def get_collection_scheme(db_name,scheme_name):
    db = mongodb_client[db_name]
    schemes_coll = db['schemes']
    scheme = schemes_coll.find({"name":scheme_name})
    return scheme
In [7]:
def mongodb_find_all_data(db_name,coll_name,scheme):
    data = mongodb_utils.get_all_data(mongodb_client,db_name,coll_name,scheme)
    return data
In [8]:
def mongodb_find_data_select_by_tags(db_name,coll_name,tags,scheme):
    data = mongodb_utils.get_data_select_by_tags(mongodb_client,db_name,coll_name,tags,scheme)
    return data
In [9]:
def kairosdb_find_all_data(db_name,coll_name,scheme):
    data = kairosdb_utils.get_all_data(kairosdb_server,db_name,coll_name,scheme)
    return data
In [10]:
def kairosdb_find_data_select_by_tags(db_name,coll_name,tags,scheme):
    data = kairosdb_utils.get_data_select_by_tags(kairosdb_server,db_name,coll_name,tags,scheme)
    return data
In [11]:
def influxdb_find_all_data(db_name,coll_name,scheme):
    data = influxdb_utils.get_all_data(influxdb_client,db_name,coll_name,scheme)
    return data
In [12]:
def influxdb_find_data_select_by_tags(db_name,coll_name,tags,scheme):
    data = influxdb_utils.get_data_select_by_tags(influxdb_client,db_name,coll_name,tags,scheme)
    return data
In [13]:
def warp10_find_data_select_by_tags(db_name,coll_name,tags,scheme):
    import json
    res = warp10_utils.get_data_select_by_tags(warp10_server,db_name,coll_name,tags,scheme)    
            
    nb_docs = 0
    data_list = []
    for r in res :
        for d in r:
            nb_docs += len(d['v'])
            for v in d['v']:
                data_list.append([v[0], json.loads(v[4])])
    tagname = 'TAG'
    for i,(k,v) in enumerate(tags.items()):
        tagname = tagname+'.'+v

    cols = [ k for k in scheme.keys()]
    results = []
    for d in data_list:
        data = d[0]
        str_value = str(id)+';'+str(d[0])+';'+tagname+';'+str(d[1][0])+';'+str(d[1][1])
        values = str_value.split(';')
        results.append({ cols[i]:values[i] for i in range(len(cols))})
    return results

Conversion les dates des données à unix timestamp

Les objects de datetime ne peuvent pas directemant utilisés pour tracer l'axis X, donc il faut d'abord convertir les datetime à unix timestamp.

In [14]:
def to_unix_time(dt):
    epoch =  datetime.utcfromtimestamp(7200)
    return (dt - epoch).total_seconds() * 1000
In [15]:
def str_to_unix(date):
    dt = datetime.strptime(date, '%d/%m/%Y %H:%M:%S')
    epoch = datetime.utcfromtimestamp(0)
    return int((dt - epoch).total_seconds()) * 1000

Nettoyage des données dans la base

Normalement cette étape est faite avant le stockage des données, vu que on ne sait pas encore les règles détaillés pour la validation les données, on insère quand même les données corruptibles dans la base.
Dans cet exemple, la source des données contient une ligne de données qui manque trois champs pour tester la validation des données.
Donc avant visualiser les courbes, on trouve d'abord les données corruptibles et les supprime. Les règles de validation temporaires sont :

  • tous les champs sont présentés
  • les types des champs sont string sauf "_id" car cerberus ne connaît pas le type bson.objectid.ObjectId
  • tous les champs ne sont pas string vide
In [16]:
def clean_data(scheme,data):
    from cerberus import Validator
    v = Validator(scheme)
    for index,item in enumerate(data,start=0):
        res = v.validate(item)
        if (res == False):
            print("corrupt data in line :",index,", error : ",v.errors)
            del data[index]

Connecter à la base de test et affichhe le nombre total des données trouvées dans la collection désirée

In [17]:
scheme = get_collection_scheme(db_name,'SmartGrid')
scheme[0]
Out[17]:
{'_id': ObjectId('5ef1d7e703b14cb959702ca3'),
 'name': 'SmartGrid',
 'value': {'timestamp': {'type': 'string', 'required': True, 'empty': False},
  'tagname': {'type': 'string', 'required': True, 'empty': False},
  'value': {'type': 'string', 'required': True, 'empty': False},
  'quality': {'type': 'string', 'required': True, 'empty': False}}}
In [18]:
%%time
coll_name='SmartGridCryolite20190101OneMonthV10000'
data = mongodb_find_all_data(db_name,coll_name,scheme[0]['value'])
print("number of docs",len(data))
769598  documents found
number of docs 769598
CPU times: user 3.95 s, sys: 282 ms, total: 4.23 s
Wall time: 4.64 s
In [19]:
%%time
coll_name='SmartGridCryolite20190101OneMonthV10000'
tags = { 'Buiding' : 'CRY', 'Device' : 'CENTRALE_SOLAIRE', 'Measure' : 'CRY_act_prod_pow' }
data = mongodb_find_data_select_by_tags(db_name,coll_name,tags,scheme[0]['value'])
print("number of docs",len(data))
27584  documents found
number of docs 27584
CPU times: user 264 ms, sys: 70.2 ms, total: 335 ms
Wall time: 670 ms
In [20]:
%%time
coll_name='SmartGridCryolite20190101OneMonthV10000'
tags = { 'Buiding' : 'CRY', 'Device' : 'CENTRALE_SOLAIRE', 'Measure' : 'CRY_act_prod_pow' }
data = kairosdb_find_data_select_by_tags(db_name,coll_name,tags,scheme[0]['value'])
print("number of docs",len(data))
number of docs 27584
CPU times: user 145 ms, sys: 14.8 ms, total: 160 ms
Wall time: 852 ms
In [21]:
%%time
coll_name='SmartGridCryolite20190101OneMonthV10000'
tags = { 'Buiding' : 'CRY', 'Device' : 'CENTRALE_SOLAIRE', 'Measure' : 'CRY_act_prod_pow' }
data = influxdb_find_data_select_by_tags(db_name,coll_name,tags,scheme[0]['value'])
print("number of docs",len(data))
number of docs 27584
CPU times: user 2.33 s, sys: 190 ms, total: 2.52 s
Wall time: 6.95 s
In [22]:
%%time
coll_name='SmartGridCryolite20190101OneMonthV10000'
tags = { 'Buiding' : 'CRY', 'Device' : 'CENTRALE_SOLAIRE', 'Measure' : 'CRY_act_prod_pow' }
data = warp10_find_data_select_by_tags(db_name,coll_name,tags,scheme[0]['value'])
print("number of docs",len(data))
Status code: 200
number of docs 27584
CPU times: user 327 ms, sys: 18.2 ms, total: 345 ms
Wall time: 686 ms

Nettoyer les données récupérées

Les informations des données corruptibles seront affichées dans la console :

  • position (nbr de ligne)
  • nom de champs qui a des problème
  • message d'erreur, pourquoi cette partie de données n'est pas validée
In [23]:
%%time
clean_data(scheme[0]['value'],data)
CPU times: user 11 s, sys: 2.59 ms, total: 11 s
Wall time: 11 s
In [19]:
%%time
df = pd.DataFrame(data)
df[0:5]
CPU times: user 926 ms, sys: 15 ms, total: 941 ms
Wall time: 939 ms
Out[19]:
_id timestamp tagname value quality
0 5ef3400c931b02970f441bd6 01/01/2019 09:15:12 CRY.CENTRALE_SOLAIRE.CRY_act_prod_pow 1.000000000 100.0
1 5ef3400c931b02970f441bd7 01/01/2019 09:15:18 CRY.CENTRALE_SOLAIRE.CRY_act_prod_pow 0.000000000 100.0
2 5ef3400c931b02970f441bd8 01/01/2019 09:15:37 CRY.CENTRALE_SOLAIRE.CRY_act_prod_pow 1.000000000 100.0
3 5ef3400c931b02970f441bd9 01/01/2019 09:15:43 CRY.CENTRALE_SOLAIRE.CRY_act_prod_pow 0.000000000 100.0
4 5ef3400c931b02970f441bda 01/01/2019 09:15:53 CRY.CENTRALE_SOLAIRE.CRY_act_prod_pow 1.000000000 100.0
In [20]:
df['value']
Out[20]:
0          1.000000000
1          0.000000000
2          1.000000000
3          0.000000000
4          1.000000000
              ...     
769593     2.000000000
769594     0.000000000
769595    -1.000000000
769596     2.000000000
769597     0.000000000
Name: value, Length: 769598, dtype: object
In [21]:
df.tagname.unique()
Out[21]:
array(['CRY.CENTRALE_SOLAIRE.CRY_act_prod_pow',
       'CRY.CENTRALE_SOLAIRE.CRY_app_prod_pow',
       'CRY.CENTRALE_SOLAIRE.CRY_rea_prod_pow',
       'CRY.TGBT_NORMAL.CRY_act_cons_pow',
       'CRY.TGBT_NORMAL.CRY_app_cons_pow',
       'CRY.TGBT_NORMAL.CRY_rapp_cons_ene',
       'CRY.TGBT_NORMAL.CRY_rea_cons_pow'], dtype=object)
In [22]:
df2 = df.loc[df['tagname'] == 'CRY.CENTRALE_SOLAIRE.CRY_act_prod_pow']
df2[0:10]
Out[22]:
_id timestamp tagname value quality
0 5ef3400c931b02970f441bd6 01/01/2019 09:15:12 CRY.CENTRALE_SOLAIRE.CRY_act_prod_pow 1.000000000 100.0
1 5ef3400c931b02970f441bd7 01/01/2019 09:15:18 CRY.CENTRALE_SOLAIRE.CRY_act_prod_pow 0.000000000 100.0
2 5ef3400c931b02970f441bd8 01/01/2019 09:15:37 CRY.CENTRALE_SOLAIRE.CRY_act_prod_pow 1.000000000 100.0
3 5ef3400c931b02970f441bd9 01/01/2019 09:15:43 CRY.CENTRALE_SOLAIRE.CRY_act_prod_pow 0.000000000 100.0
4 5ef3400c931b02970f441bda 01/01/2019 09:15:53 CRY.CENTRALE_SOLAIRE.CRY_act_prod_pow 1.000000000 100.0
5 5ef3400c931b02970f441bdb 01/01/2019 09:15:58 CRY.CENTRALE_SOLAIRE.CRY_act_prod_pow 0.000000000 100.0
6 5ef3400c931b02970f441bdc 01/01/2019 09:16:13 CRY.CENTRALE_SOLAIRE.CRY_act_prod_pow 1.000000000 100.0
7 5ef3400c931b02970f441bdd 01/01/2019 09:16:23 CRY.CENTRALE_SOLAIRE.CRY_act_prod_pow 0.000000000 100.0
8 5ef3400c931b02970f441bde 01/01/2019 09:16:53 CRY.CENTRALE_SOLAIRE.CRY_act_prod_pow 1.000000000 100.0
9 5ef3400c931b02970f441bdf 01/01/2019 09:16:59 CRY.CENTRALE_SOLAIRE.CRY_act_prod_pow 0.000000000 100.0

Convertir les string d'heure à datetime object

In [23]:
axisX = df['timestamp']
In [24]:
# convertir cas MongoDB
df['timestamp'] = df['timestamp'].apply(str_to_unix)

Préparer la figure basée sur la bibliothèque plotly

Pour éviter les données dédoublées, on recupère les premières 5000 lignes comme la source de données du plot.
Dans cet plot les courbes partagent les axis-X et axis-Y, donc on peut observer que les tendances des courbes ne sont pas très claire parce que les plages de valeur des colonnes de données sont très variées.

In [25]:
# Cas MongoDB
x = df2['timestamp'].values
nb_pts = len(x)
nb_pts
Out[25]:
27584
In [26]:
y_value=[float(item) for item in df2["value"]]
y_quality=[float(item)/100 for item in df2["quality"]]
In [27]:
x = df['timestamp'].values
nb_pts = len(x)
nb_pts
Out[27]:
769598
In [28]:
y_value=[float(item) for item in df["value"]]
y_quality=[float(item)/100 for item in df["quality"]]
In [29]:
%matplotlib inline
nb_pts=25000
fig = go.Figure()
fig.add_trace(go.Scatter(
                x=axisX[0:nb_pts-1],
                y=y_value[0:nb_pts-1],
                name="value",
                line_color='deepskyblue',
                opacity=0.8))

fig.add_trace(go.Scatter(
                x=axisX[0:nb_pts-1],
                y=y_quality[0:nb_pts-1],
                name="quality",
                line_color='dimgray',
                opacity=0.8))

# Use date string to set xaxis range
fig.update_layout(xaxis_range=[x[0],
                               x[nb_pts-1]],
                  title_text="smartgrid data series")
fig.show()

Cet plot contient trois subplots, dans la figure, les courbes partagent l'axis-X mais elles possèdent différents axis-Y.
On remarque que les tendances des courbes sont plus évidentes que la figure précédente.

In [30]:
%matplotlib inline
nb_pts=25000
fig = make_subplots(
    rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.02
)

fig.add_trace(go.Scatter(
                x=axisX[0:nb_pts-1],
                y=y_value[0:nb_pts-1],
                name="value",
                line_color='deepskyblue',
                line_width = 2,
                opacity=0.8),
              row=1, col=1)

fig.add_trace(go.Scatter(
                x=axisX[0:nb_pts-1],
                y=y_quality[0:nb_pts-1],
                name="quality",
                line_color='dimgray',
                opacity=0.8),
              row=2, col=1)

fig.update_layout(height=1000, width=1000,
                  title_text="smartGrid data series")
fig.show()

Initialiser visdom et envoyer le plot à visdom via requête Http post

Avant cette étape, il faut lancer le service de visdom dans le terminal.

> visdom

L'address du web UI de visdom sera affichée dans la console.

In [31]:
vis = visdom.Visdom()
vis.plotlyplot(fig, win="mywin3")
Setting up a new session...
Out[31]:
'mywin3'

Ajuster la taille de la fenêtre

In [32]:
vis.update_window_opts(win = "mywin3", opts=dict(width=1200, height=1500))
Out[32]:
'mywin3'

Après l'exécution de ce script, ouvrir le web UI de visdom, le plot est présenté dans une fenêtre intéractive.